archived/inference-benchmarking/benchmarking/runner.py (371 lines of code) (raw):
from concurrent import futures
from datetime import datetime
import json
import logging
from pathlib import Path
from typing import Any, Callable, Optional, Tuple, Type
from typing import Dict
from typing import List
import pandas as pd
from sagemaker.jumpstart.model import JumpStartModel
from sagemaker.predictor import Predictor
from sagemaker.predictor import retrieve_default
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.session import Session
from sagemaker.utils import name_from_base
from sagemaker import image_uris
from sagemaker.model import Model
from benchmarking.clients import PricingClient
from benchmarking.clients import SageMakerClient
from benchmarking.concurrency_probe import num_invocation_scaler
from benchmarking.concurrency_probe import ConcurrentProbeIteratorBase
from benchmarking.concurrency_probe import ConcurrentProbeExponentialScalingIterator
from benchmarking.constants import MAX_CONCURRENT_BENCHMARKS, SAVE_METRICS_FILE_PATH
from benchmarking.constants import MAX_CONCURRENT_INVOCATIONS_PER_MODEL
from benchmarking.constants import MAX_TOTAL_RETRY_TIME_SECONDS
from benchmarking.constants import NUM_INVOCATIONS
from benchmarking.constants import RETRY_WAIT_TIME_SECONDS
from benchmarking.constants import SM_SESSION
from benchmarking.load_test import LoadTester
from benchmarking.logging import logging_prefix
from benchmarking.custom_predictor import CustomPredictor
class Benchmarker:
def __init__(
self,
payloads: Dict[str, Dict[str, Any]],
max_concurrent_benchmarks: int = MAX_CONCURRENT_BENCHMARKS,
sagemaker_session: Session = SM_SESSION,
num_invocations: int = NUM_INVOCATIONS,
max_workers: int = MAX_CONCURRENT_INVOCATIONS_PER_MODEL,
retry_wait_time: float = RETRY_WAIT_TIME_SECONDS,
max_total_retry_time: float = MAX_TOTAL_RETRY_TIME_SECONDS,
run_latency_load_test: bool = False,
run_throughput_load_test: bool = False,
run_concurrency_probe: bool = False,
concurrency_probe_num_invocation_hook: Optional[Callable[[int], int]] = None,
concurrency_probe_concurrent_request_iterator_cls: Optional[Type[ConcurrentProbeIteratorBase]] = None,
clean_up: bool = False,
attempt_retrieve_predictor: bool = True,
saved_metrics_path: Path = SAVE_METRICS_FILE_PATH,
):
self.payloads = payloads
self.max_concurrent_benchmarks = max_concurrent_benchmarks
self.sagemaker_session = sagemaker_session
self.num_invocations = num_invocations
self.max_workers = max_workers
self.retry_wait_time = retry_wait_time
self.max_total_retry_time = max_total_retry_time
self.run_latency_load_test = run_latency_load_test
self.run_throughput_load_test = run_throughput_load_test
self.run_concurrency_probe = run_concurrency_probe
if concurrency_probe_num_invocation_hook is None:
self.concurrency_probe_num_invocation_hook = num_invocation_scaler
else:
self.concurrency_probe_num_invocation_hook = concurrency_probe_num_invocation_hook
if concurrency_probe_concurrent_request_iterator_cls is None:
self.concurrency_probe_concurrent_request_iterator_cls = ConcurrentProbeExponentialScalingIterator
else:
self.concurrency_probe_concurrent_request_iterator_cls = concurrency_probe_concurrent_request_iterator_cls
self.clean_up = clean_up
self._pricing_client = PricingClient()
self._sagemaker_client = SageMakerClient()
self.model_id_to_endpoint_name: Dict[str, str] = {}
if attempt_retrieve_predictor:
self.model_id_to_endpoint_name = self.load_metrics_json(saved_metrics_path).get("endpoints", {})
def _run_benchmarking_tests(
self,
predictor: CustomPredictor,
payload: Dict[str, Any],
model_id: str,
payload_name: str,
tokenizer_model_id: str,
huggingface_hub_token: Optional[str] = None,
) -> Dict[str, Any]:
metrics_latency: Dict[str, Any] = {}
metrics_throughput: Dict[str, Any] = {}
metrics_concurrency: Dict[str, Any] = {}
if predictor.predictor is not None:
endpoint_description = self._sagemaker_client.describe_endpoint(predictor.endpoint_name)
endpoint_config_name = endpoint_description["EndpointConfigName"]
endpoint_config_description = self._sagemaker_client.describe_endpoint_config(endpoint_config_name)
model_description = self._sagemaker_client.describe_model(predictor.endpoint_name)
production_variant = endpoint_config_description["ProductionVariants"][0]
# primary_container = model_description["PrimaryContainer"]
instance_type = production_variant["InstanceType"]
price_per_instance = self._pricing_client.get_price_per_unit(instance_type, SM_SESSION._region_name)
price_per_endpoint = production_variant["InitialInstanceCount"] * price_per_instance
metrics_pricing = {
"PricePerInstance": price_per_instance,
"PricePerEndpoint": price_per_endpoint,
}
creation_time: datetime = endpoint_description["CreationTime"]
last_modified_time: datetime = endpoint_description["LastModifiedTime"]
metrics_time = {
"CreationTime": creation_time.isoformat(),
"LastModifiedTime": last_modified_time.isoformat(),
"DeploymentTime": (last_modified_time - creation_time).seconds,
}
else:
if hasattr(predictor, "instance_type") and predictor.instance_type is not None:
instance_type = predictor.instance_type
else:
logging.info(
f"{logging_prefix(model_id)} No instance type provided. Using the default ml.g5.2xlarge for pricing calculations."
)
instance_type = "ml.g5.2xlarge"
if hasattr(predictor, "instance_count") and predictor.instance_count is not None:
initial_instance_count = predictor.instance_count
else:
logging.info(
f"{logging_prefix(model_id)} No initial_instance_count provided. Using the default count 1."
)
initial_instance_count = 1
price_per_instance = self._pricing_client.get_price_per_unit(instance_type, SM_SESSION._region_name)
price_per_endpoint = initial_instance_count * price_per_instance
metrics_pricing = {
"PricePerInstance": price_per_instance,
"PricePerEndpoint": price_per_endpoint,
}
metrics_time = {
"CreationTime": 0,
"LastModifiedTime": 0,
"DeploymentTime": 0,
}
production_variant = {
"VariantName": "AllTraffic",
"ModelName": predictor.endpoint_name,
"InitialInstanceCount": initial_instance_count,
"InstanceType": instance_type,
"InitialVariantWeight": 1.0,
"ModelDataDownloadTimeoutInSeconds": 3600,
"ContainerStartupHealthCheckTimeoutInSeconds": 3600,
}
tester = LoadTester(
predictor,
payload,
model_id,
payload_name,
tokenizer_model_id,
huggingface_hub_token,
price_per_endpoint,
)
if self.run_latency_load_test:
metrics_latency = tester.run_latency_load_test(self.num_invocations)
if self.run_throughput_load_test:
metrics_throughput = tester.run_throughput_load_test(self.num_invocations, self.max_workers)
if self.run_concurrency_probe:
concurrency_probe_results = tester.run_concurrency_probe(
iterator_cls=self.concurrency_probe_concurrent_request_iterator_cls,
num_invocation_hook=self.concurrency_probe_num_invocation_hook,
)
metrics_concurrency = {"ConcurrencyProbe": concurrency_probe_results}
return {
**metrics_latency,
**metrics_throughput,
**metrics_concurrency,
**metrics_pricing,
**metrics_time,
"ProductionVariant": production_variant,
}
def run_single_predictor(
self,
model_id: str,
predictor: CustomPredictor,
tokenizer_model_id: Optional[str] = None,
huggingface_hub_token: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Run benchmarker given a Predictor for an in-service model endpoint."""
metrics = []
try:
for payload_name, payload in self.payloads.items():
metrics_payload = self._run_benchmarking_tests(
predictor,
payload,
model_id,
payload_name,
tokenizer_model_id,
huggingface_hub_token,
)
metrics.append(metrics_payload)
finally:
if self.clean_up is True:
self.clean_up_predictor(model_id, predictor)
else:
logging.info(f"{logging_prefix(model_id)} Skipping cleaning up resources ...")
return metrics
def run_single_model(
self, model_id: str, model_args: Dict[str, Any]
) -> Tuple[List[Dict[str, Any]], CustomPredictor]:
"""Run benchmarker for a single model.
If an `endpoint_name` is provided either as a key in `model_args` or saved in benchmarking metrics file from
a previous invocation of this benchmarker, then a predictor is attempted to be attached to this endpoint. If
an `endpoint_name` is not provided, then the model is deployed prior to benchmarking run.
"""
endpoint_name = model_args.get("endpoint_name") or self.model_id_to_endpoint_name.get(model_id)
endpoint_url = model_args.get("endpoint_url")
instance_type = model_args.get("instance_type")
if endpoint_url is not None:
predictor = CustomPredictor(endpoint_url=endpoint_url, instance_type=instance_type)
elif endpoint_name is not None:
try:
predictor = self.retrieve_predictor_from_endpoint(endpoint_name, model_args)
predictor = CustomPredictor(predictor=predictor)
logging.info(f"{logging_prefix(model_id)} Predictor successfully retrieved from endpoint name")
except Exception as e:
logging.warning(f"{logging_prefix(model_id)} Failed to retrieve predictor, re-deploying model: {e}")
predictor = self.deploy_model(model_id, model_args)
predictor = CustomPredictor(predictor=predictor)
else:
predictor = self.deploy_model(model_id, model_args)
predictor = CustomPredictor(predictor=predictor)
self.model_id_to_endpoint_name[model_id] = predictor.endpoint_name
metrics = self.run_single_predictor(
model_id,
predictor,
model_args["huggingface_model_id"],
model_args.get("huggingface_hub_token"),
)
return metrics, predictor
def retrieve_predictor_from_endpoint(
self, endpoint_name: str, model_args: Optional[Dict[str, Any]] = None
) -> Predictor:
"""Obtain a predictor from an already deployed endpoint."""
if model_args is not None:
jumpstart_model_args: Dict[str, Any] = model_args.get("jumpstart_model_args")
if jumpstart_model_args:
return retrieve_default(
endpoint_name=endpoint_name,
model_id=jumpstart_model_args["model_id"],
model_version=jumpstart_model_args.get("model_version", "*"),
sagemaker_session=self.sagemaker_session,
)
return Predictor(
endpoint_name=endpoint_name,
sagemaker_session=self.sagemaker_session,
serializer=JSONSerializer(),
deserializer=JSONDeserializer(),
)
def deploy_model(self, model_id: str, model_args: Dict[str, Any]) -> Predictor:
"""Deploy a model with configuration defined by model_args.
Two model deployment methods are supported:
- Use JumpStartModel object with kwargs defined in `jumpstart_model_specs` key.
- Use Model object with `image_uri_args`, `model_args`, and `deploy_args` kwards defined in `model_specs` key.
Raises:
ValueError: if neither `jumpstart_model_specs` or `model_specs` keys are present in model_args.
"""
jumpstart_model_specs: Optional[Dict[str, Any]] = model_args.get("jumpstart_model_specs")
model_specs: Optional[Dict[str, Any]] = model_args.get("model_specs")
endpoint_name = name_from_base(f"bm-{model_id.replace('huggingface', 'hf')}")
logging.info(f"{logging_prefix(model_id)} Deploying endpoint {endpoint_name} ...")
if jumpstart_model_specs:
model = JumpStartModel(
sagemaker_session=self.sagemaker_session,
**jumpstart_model_specs["model_args"],
)
return model.deploy(
endpoint_name=endpoint_name,
**jumpstart_model_specs.get("deploy_args", {}),
)
elif model_specs:
image_uri = image_uris.retrieve(region=SM_SESSION._region_name, **model_specs["image_uri_args"])
model = Model(
image_uri=image_uri,
role=SM_SESSION.get_caller_identity_arn(),
predictor_cls=Predictor,
name=endpoint_name,
**model_specs["model_args"],
)
return model.deploy(
serializer=JSONSerializer(),
deserializer=JSONDeserializer(),
endpoint_name=endpoint_name,
**model_specs["deploy_args"],
)
else:
raise ValueError(f"{logging_prefix(model_id)} No model arguments discovered for deployment.")
def run_multiple_models(
self,
models: Dict[str, Dict[str, Any]],
save_file_path: Path = SAVE_METRICS_FILE_PATH,
) -> Dict[str, Any]:
"""Concurrently call run_single_model for all models and aggregate benchmarking output."""
metrics = []
errors = {}
endpoints: Dict[str, str] = {}
with futures.ThreadPoolExecutor(max_workers=self.max_concurrent_benchmarks) as executor:
future_to_model_id = {
executor.submit(self.run_single_model, model_id, args): model_id for model_id, args in models.items()
}
for future in futures.as_completed(future_to_model_id):
model_id = future_to_model_id[future]
try:
metrics_model_id, predictor = future.result()
endpoints[model_id] = predictor.endpoint_name
metrics.extend(metrics_model_id)
except Exception as e:
errors[model_id] = e
logging.error(f"{logging_prefix(model_id)} Benchmarking failed: {e}")
output = {
"models": models,
"payloads": self.payloads,
"endpoints": endpoints,
"metrics": metrics,
}
with open(save_file_path, "w") as file:
json.dump(output, file, indent=4, ensure_ascii=False)
return output
@classmethod
def load_metrics_pandas(cls, save_file_path: Path = SAVE_METRICS_FILE_PATH) -> pd.DataFrame:
"""Create a pandas DataFrame from the saved JSON metrics file."""
metrics = cls.load_metrics_json(save_file_path)
return pd.json_normalize(
data=metrics,
record_path=["metrics", "ConcurrencyProbe"],
meta=[
["metrics", "ProductionVariant", "InstanceType"],
["metrics", "ProductionVariant", "InitialInstanceCount"],
["metrics", "PrimaryContainer", "Image"],
["metrics", "PricePerEndpoint"],
["metrics", "PricePerInstance"],
["metrics", "DeploymentTime"],
],
errors="ignore",
)
@staticmethod
def create_concurrency_probe_pivot_table(
df: pd.DataFrame,
value_format_dict: Optional[Dict[str, Callable]] = None,
value_name_dict: Optional[Dict[str, str]] = None,
fillna_str: str = "--",
) -> pd.DataFrame:
"""Pivot concurrency probe pandas DataFrame to show specified values across models and concurrent requests."""
if value_format_dict is None:
value_format_dict = {
"TokenThroughput": "{:.2f}".format,
"LatencyPerToken.p90": int,
"CostToGenerate1MTokens": "${:,.2f}".format,
}
if value_name_dict is None:
value_name_dict = {
"LatencyPerToken.p90": "p90 latency (ms/token)",
"TokenThroughput": "throughput (tokens/s)",
"CostToGenerate1MTokens": "cost to generate 1M tokens ($)",
}
df_copy = df.copy()
index_cols = [
"ModelID",
"metrics.ProductionVariant.InstanceType",
"PayloadName",
]
columns_cols = ["ConcurrentRequests"]
value_cols = value_format_dict.keys()
for value_name, mapping_function in value_format_dict.items():
df_copy[value_name] = df_copy[value_name].map(mapping_function)
df_pivot = df_copy.pivot(index=index_cols, columns=columns_cols, values=value_cols).fillna(fillna_str)
df_pivot = df_pivot.rename(columns=value_name_dict)
df_pivot.index = df_pivot.index.rename(["model ID", "instance type", "payload"])
df_pivot.columns = df_pivot.columns.rename([None, "concurrent requests"])
return df_pivot
def clean_up_resources(self) -> None:
"""Delete model and endpoint for all endpoints attached to this benchmarker."""
for model_id, endpoint_name in self.model_id_to_endpoint_name.items():
predictor = self.retrieve_predictor_from_endpoint(endpoint_name)
self.clean_up_predictor(model_id, predictor)
@classmethod
def clean_up_predictor(cls, model_id: str, predictor: CustomPredictor) -> None:
"""Delete model and endpoint for a single predictor."""
logging.info(f"{logging_prefix(model_id)} Cleaning up resources ...")
predictor.delete_model()
predictor.delete_endpoint()
@staticmethod
def load_metrics_json(
save_file_path: Path = SAVE_METRICS_FILE_PATH,
) -> Dict[str, str]:
"""Attempt to load metrics from a previous benchmarking run."""
try:
with open(save_file_path, "r") as f:
data = json.load(f)
except Exception as e:
logging.warning(f"Failed to extract endpoint names from saved benchmarking file: {e}")
return {}
return data